package com.parser.sixoo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.*;

public class Parser611 {

    private final static Logger LOGGER = LoggerFactory.getLogger(Parser611.class);

    private static ThreadPoolExecutor service;

    private final static LinkedBlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>(10000);
    private final static LinkedBlockingQueue<ByteBuffer> queueA = new LinkedBlockingQueue<>();
    private final static LinkedBlockingQueue<ByteBuffer> queueB = new LinkedBlockingQueue<>();

    private static final String[] ids = new String[15000];
    private static final float[] values = new float[15000];

    private static void genIDS() {
        String p1 = "p0000";
        String p2 = "p000";
        String p3 = "p00";
        String p4 = "p0";
        String p5 = "p";
        for (int i = 1; i <= ids.length; i++) {
            if (i < 10) {
                ids[i - 1] = p1 + i;
            } else if (i >= 10 && i < 100) {
                ids[i - 1] = p2 + i;
            } else if (i >= 100 && i < 1000) {
                ids[i - 1] = p3 + i;
            } else if (i >= 1000 && i < 10000) {
                ids[i - 1] = p4 + i;
            } else if (i >= 10000 && i < 100000) {
                ids[i - 1] = p5 + i;
            }
        }
    }


    /**
     * Mapped File way MappedByteBuffer 可以在处理大文件时，提升性能
     *
     * @param filename
     * @return
     * @throws IOException
     */
    public static byte[] toByteArrayforBigFile(String filename) throws IOException {

        FileChannel fc = null;
        try {
            fc = new RandomAccessFile(filename, "r").getChannel();
            MappedByteBuffer byteBuffer = fc.map(FileChannel.MapMode.READ_ONLY, 0,
                    fc.size()).load();
            System.out.println(byteBuffer.isLoaded());
            byte[] result = new byte[(int) fc.size()];
            int j = 0;
            if (byteBuffer.remaining() > 0) {
                byteBuffer.get(result, 0, byteBuffer.remaining());
            }
            return result;
        } catch (IOException e) {
            e.printStackTrace();
            throw e;
        } finally {
            try {
                fc.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void init() {
        genIDS();

        service = new ThreadPoolExecutor(20, 40, 1000000,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(100),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                        try {
                            threadPoolExecutor.getQueue().put(runnable);
                        } catch (InterruptedException e) {
                            LOGGER.error("重新加入任务队列！", e);
                        }
                    }
                });
    }


    public static void main(String[] args) throws IOException {

        init();

        byte[] bytes = toByteArrayforBigFile("/home/hdfs/soft/IdeaProjects/dec-kks-etl/parse-data-611/src/main/resources/1.dat");

        long start = System.currentTimeMillis();

        ByteBuffer buf = ByteBuffer.wrap(bytes);

        // 文件头
        byte[] pack_info = new byte[512];
        buf.get(pack_info, 0, 512);

        // 分包
        new FBThread(buf).start();

        // 解析
        for (int i = 0; i < 20; i++) {
            service.submit(new parseWorker());
        }

        // 组装对象

        // 存储

        long end = System.currentTimeMillis();
        System.out.println("耗时：" + (end - start));
    }


    public static String parseByte2HexStr(byte buf[]) {
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < buf.length; i++) {
            String hex = Integer.toHexString(buf[i] & 0xFF);
            if (hex.length() == 1) {
                hex = '0' + hex;
            }
            sb.append(hex.toUpperCase());
        }
        return sb.toString();
    }


    /**
     * 分包线程
     */
    static class FBThread extends Thread {

        private ByteBuffer buf;

        public FBThread(ByteBuffer buf) {
            this.buf = buf;
        }

        @Override
        public void run() {
            boolean b_in;
            int i = 0;
            while (buf.hasRemaining()) {
                System.out.println("分包次数：" + i++);
                byte[] pack_data = new byte[13512];
                buf.get(pack_data, 0, 13512);
                ByteBuffer buf_pck = ByteBuffer.wrap(pack_data);
                while (true) {
                    try {
                        b_in = queue.offer(buf_pck, 1, TimeUnit.MILLISECONDS);
                        if (b_in) {
                            break;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    /**
     * 解析线程
     */
    static class parseWorker extends Thread {

        @Override
        public void run() {
            while (true) {
                try {
                    ByteBuffer buf = queue.poll(1000, TimeUnit.SECONDS);
                    if (buf == null) {
                        continue;
                    }
                    while (buf.hasRemaining()) {
                        System.out.println(Thread.currentThread() + "解析包：" + buf.array().length);
                        // 包头解析
                        byte[] pack_head = new byte[12];
                        buf.get(pack_head, 0, 12);
                        ByteBuffer buf_head = ByteBuffer.wrap(pack_head);
                        byte[] byte_channel_1 = new byte[2];
                        buf_head.get(byte_channel_1, 0, 2);
                        byte[] byte_channel_2 = new byte[2];
                        buf_head.get(byte_channel_2, 0, 2);
                        byte[] byte_channel_flag = new byte[2];
                        buf_head.get(byte_channel_flag, 0, 2);
                        byte[] byte_data_len = new byte[2];
                        buf_head.get(byte_data_len, 0, 2);
                        byte[] byte_data_time = new byte[4];
                        buf_head.get(byte_data_time, 0, 4);
                        // 包体解析
                        byte[] pack_data = new byte[13500];
                        buf.get(pack_data, 0, 13500);

                        ByteBuffer pd_bf = ByteBuffer.wrap(pack_data);
                        byte[] pv = new byte[4];
                        int j = 0;
                        while (pd_bf.hasRemaining()) {
                            pd_bf.get(pv, 0, 4);
                            if (j > 3000) {
                                values[j] = ByteUtil611.byte2intWithB(pv);
                                String str = ByteUtil611.conver2HexStr(pv);
                                j++;
                            } else {
                                values[j] = ByteUtil611.byte2floatWithB(pv);
                                j++;
                            }
                        }

                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }


        }
    }

}
